#!/usr/bin/env python3
# -*- coding: utf-8 -*-
################################
# File Name   : safe_rm
# Author      : liyanqing.1987
# Created On  : 2024-04-21 21:28:47
# Description : Safe "rm" script to replace linux "rm" command.
# Version     : V1.2 (2025-03-03)
################################
import os
import re
import sys
import copy
import glob
import json
import socket
import shutil
import datetime
import subprocess

os.environ['PYTHONUNBUFFERED'] = '1'
CURRENT_SCRIPT = sys.argv[0]
CWD = os.getcwd()
HOSTNAME = socket.gethostname()
CURRENT_DATE = datetime.datetime.now().strftime('%Y%m%d')
CURRENT_TIME = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
USER = os.popen('whoami').read().strip()
LOGIN_USER = os.popen('who am i').read().strip()

if LOGIN_USER:
    LOGIN_USER = re.sub(r' .*', '', LOGIN_USER)
else:
    LOGIN_USER = USER


def debug_print(message, debug_level_list=[1, ]):
    """
    Print message based on the debug_level which is specified by environment variable "SAFE_RM_DEBUG".
    [export SAFE_RM_DEBUG=1] : show warning message.
    [export SAFE_RM_DEBUG=2] : show command.
    [export SAFE_RM_DEBUG=3] : show configuration.
    [export SAFE_RM_DEBUG=4] : pure debug mode, not really execute rm or mv operations.
    """
    debug_level_list.append(4)

    if ('SAFE_RM_DEBUG' in os.environ) and re.match(r'^[1234]$', os.environ['SAFE_RM_DEBUG']) and (int(os.environ['SAFE_RM_DEBUG']) in debug_level_list):
        print('\033[1;34m' + str(message) + '\033[0m')


def print_warning(message):
    """
    Print specified message with yellow color.
    """
    print('\033[1;33m' + str(message) + '\033[0m')


class SafeRm():
    """
    Build safe "rm" class with protection/recycle/log/alarm machanism.
    -----------
    Protection: Protect path in self.protected_path_list from being deleted.
    Recycle: Move the data to be deleted to the recycle bin.
    Log: Save logs for all data deletion operations.
    Alarm: Alarm for deletion operation to protected data.
    -----------
    """
    def __init__(self):
        # [Editable] self.protected_path_list : specify protected paths (absolute path), globally effective.
        self.protected_path_list = [
            '~',
            '/',
            '/bin',
            '/boot',
            '/dev',
            '/etc',
            '/home',
            '/initrd',
            '/lib',
            '/lib64',
            '/mnt',
            '/opt',
            '/proc',
            '/root',
            '/run',
            '/sbin',
            '/srv',
            '/sys',
            '/usr',
            '/var']

        if 'HOME' in os.environ:
            self.protected_path_list = self.extend_list_with_config(self.protected_path_list, ['/etc/safe_rm.protected', str(os.environ['HOME']) + '/.config/safe_rm.protected'])
        else:
            self.protected_path_list = self.extend_list_with_config(self.protected_path_list, ['/etc/safe_rm.protected',])

        debug_print('[CONFIG] protected_path_list : ' + str(self.protected_path_list), debug_level_list=[3, ])

        # [Editable] self.honeypot_path_list : specify honeypot paths (file name or absolute path), globally effective.
        self.honeypot_path_list = [
            '.honeypot',
            'honeypot']

        if 'HOME' in os.environ:
            self.honeypot_path_list = self.extend_list_with_config(self.honeypot_path_list, ['/etc/safe_rm.honeypot', str(os.environ['HOME']) + '/.config/safe_rm.honeypot'], check_exists=False)
        else:
            self.honeypot_path_list = self.extend_list_with_config(self.honeypot_path_list, ['/etc/safe_rm.honeypot',], check_exists=False)

        debug_print('[CONFIG] honeypot_path_list : ' + str(self.honeypot_path_list), debug_level_list=[3, ])

        # [Editable] self.recycle_mode : enable recycle bin mechnism, move the data to be deleted to the recycle bin.
        # [Editable] self.recycle_bin_dir : specify the recycle bin path (will create self.recycle_bin_dir/<USER> for current user).
        # [Editable] self.recycle_path_list : specify recycle paths (absolute path), the following files will be moved to the recycle bin when they are deleted, globally effective.
        self.recycle_mode = True
        self.recycle_bin_dir = ''
        self.recycle_path_list = [
            '~/.alias',
            '~/.bashrc',
            '~/.config',
            '~/.ssh']

        if self.recycle_mode:
            if (not self.recycle_bin_dir) or (not os.path.exists(self.recycle_bin_dir)):
                self.recycle_bin_dir = '/tmp/safe_rm/recycle_bin'
                self.create_dir(self.recycle_bin_dir, permission=0o1777)

            if 'HOME' in os.environ:
                self.recycle_path_list = self.extend_list_with_config(self.recycle_path_list, ['/etc/safe_rm.recycle', str(os.environ['HOME']) + '/.config/safe_rm.recycle'])
            else:
                self.recycle_path_list = self.extend_list_with_config(self.recycle_path_list, ['/etc/safe_rm.recycle',])

        debug_print('[CONFIG] recycle_mode : ' + str(self.recycle_mode), debug_level_list=[3, ])
        debug_print('[CONFIG] recycle_dir : ' + str(self.recycle_bin_dir), debug_level_list=[3, ])
        debug_print('[CONFIG] recycle_path_list : ' + str(self.recycle_path_list), debug_level_list=[3, ])

        # [Editable] self.log_mode : enable the behavior of saving logs.
        # [Editable] self.log_ignore_user_list : specify a user list, will not save logs fro deletion operations of users in the list.
        # [Editable] self.log_dir_list : Specify log dir(s) to save log file, make sure permission is "1777".
        self.log_mode = True
        self.log_ignore_user_list = []
        self.log_dir_list = ['/tmp/safe_rm/log']

        if self.log_mode:
            for log_dir in self.log_dir_list:
                if not os.path.exists(log_dir):
                    self.create_dir(log_dir, permission=0o1777)

        debug_print('[CONFIG] log_dir(s) : ' + str(self.log_dir_list), debug_level_list=[3, ])

        # [Editable] self.alarm_command : specify alarm command. ("<TITAL>", "<MESSAGE>" and "<USER>" are replaceable string.")
        # [Editable] self.alarm_ignore_user_list : specify a user list, no alarms will be sent fro deletion operations of users in the list.
        self.alarm_command = ''
        self.alarm_ignore_user_list = []
        debug_print('[CONFIG] alarm_command : ' + str(self.alarm_command), debug_level_list=[3, ])

        # [Editable] self.system_rm : specify system rm.
        self.system_rm = ''

        if not self.system_rm:
            self.get_system_rm()

        debug_print('[CONFIG] system_rm : ' + str(self.system_rm), debug_level_list=[3, ])

        if not self.system_rm:
            debug_print('[WARNING] No system rm is specified.', debug_level_list=[1, 2, 3])
        elif not os.path.exists(self.system_rm):
            debug_print('[WARNING] Specified system rm "' + str(self.system_rm) + '" is missing.', debug_level_list=[1, 2, 3])

    def run_command(self, command, mystdin=subprocess.PIPE, mystdout=subprocess.PIPE, mystderr=subprocess.PIPE, show=False):
        """
        Run system command with subprocess.Popen, get returncode/stdout/stderr.
        """
        SP = subprocess.Popen(command, shell=True, stdin=mystdin, stdout=mystdout, stderr=mystderr)
        (stdout, stderr) = SP.communicate()
        stdout = str(stdout, 'utf-8').strip()
        stderr = str(stderr, 'utf-8').strip()

        if self.system_rm and os.path.exists(self.system_rm):
            stdout = re.sub(self.system_rm, CURRENT_SCRIPT, stdout)
            stderr = re.sub(self.system_rm, CURRENT_SCRIPT, stderr)

        if show:
            if stdout:
                print(stdout)

            if stderr:
                print(stderr)

        return (SP.returncode, stdout, stderr)

    def extend_list_with_config(self, path_list, config_file_list, check_exists=True):
        """
        Extend path_list with config_file(s) on config_file_list.
        """
        # Parse config file(s).
        for config_file in config_file_list:
            if os.path.exists(config_file):
                try:
                    with open(config_file, 'r') as CF:
                        for line in CF.readlines():
                            if (not re.match(r'^\s*$', line)) and (not re.match(r'^\s*#.*$', line)):
                                path_list.append(line)
                except Exception:
                    debug_print('[WARNING] Failed on opening config file ' + str(config_file) + ' for read.', debug_level_list=[1, 2, 3])

        # Process for "~" and "*".
        copy_path_list = copy.deepcopy(path_list)
        tmp_path_list = []

        for path_item in copy_path_list:
            path_item = path_item.strip()

            if re.match(r'^~.*$', path_item) and ('HOME' in os.environ):
                path_item = re.sub(r'~', os.environ['HOME'], path_item)

            if re.search(r'\*', path_item):
                tmp_path_list.extend(glob.glob(path_item))
            else:
                tmp_path_list.append(path_item)

        tmp_path_list = sorted(list(set(tmp_path_list)))

        if not check_exists:
            return tmp_path_list
        else:
            # Get new_path_list with existing path_item.
            new_path_list = []

            for path_item in tmp_path_list:
                if os.path.exists(path_item):
                    new_path_list.append(path_item)

            return new_path_list

    def create_dir(self, dir_path, permission=0o1777):
        """
        Create dir with specified permission.
        """
        if (not os.path.exists(dir_path)) or (not os.path.isdir(dir_path)):
            try:
                debug_print('[COMMAND] mkdir ' + str(dir_path), debug_level_list=[2, 3])
                os.makedirs(dir_path)
                debug_print('[COMMAND] chmod ' + oct(permission)[2:] + ' ' + str(dir_path), debug_level_list=[2, 3])
                os.chmod(dir_path, permission)
            except Exception:
                debug_print('[WARNING] Failed on creating directory ' + str(dir_path) + ' with permission "' + oct(permission)[2:] + '".', debug_level_list=[1, 2, 3])

    def get_system_rm(self):
        """
        Get real system rm is self.system_rm is not specified.
        """
        real_current_script = os.path.realpath(CURRENT_SCRIPT)
        system_rm_list = ['/usr/bin/system_rm', '/usr/bin/rm', '/bin/system_rm', '/bin/rm']

        for system_rm in system_rm_list:
            real_system_rm = os.path.realpath(system_rm)

            if real_system_rm != real_current_script:
                command = '/usr/bin/file --mime ' + str(real_system_rm)
                (return_code, stdout, stderr) = self.run_command(command)

                if ('application/x-executable; charset=binary' in stdout) or ('application/x-sharedlib; charset=binary' in stdout) or ('application/x-mach-binary; charset=binary' in stdout):
                    self.system_rm = real_system_rm
                    break

    def save_log(self, message, message_level='Info'):
        """
        Save message into user log file.
        """
        if self.log_mode and (USER not in self.log_ignore_user_list):
            for log_dir in self.log_dir_list:
                log_user_dir = str(log_dir) + '/' + str(USER)
                self.create_dir(log_user_dir, permission=0o700)
                log_file = str(log_user_dir) + '/' + str(CURRENT_DATE)

                try:
                    with open(log_file, 'a') as LF:
                        if ('SAFE_RM_DEBUG' in os.environ) and (os.environ['SAFE_RM_DEBUG'] == '4'):
                            message = str(message) + '  (Virtual Operation)'

                        log_dic = {"time": CURRENT_TIME, "message_level": message_level, "user": USER, "login_user": LOGIN_USER, "host": HOSTNAME, "cwd": CWD, "message": message}
                        LF.write(str(json.dumps(log_dic, ensure_ascii=False)) + '\n')
                except Exception:
                    debug_print('[WARNING] Failed on saving message into log file "' + str(log_file) + '".', debug_level_list=[1, 2, 3])

    def convert_to_raw_string(self, input_string):
        raw_string = ""

        for char in input_string:
            if char in ['\a', '\b', '\f', '\n', '\r', '\t', '\v', '\'', '\"', '\\', r'(', r')', r' ']:
                raw_string += '\\' + char
            else:
                raw_string += char

        return raw_string

    def parse_rm_command(self):
        """
        Get arguments and paths from rm command.
        """
        rm_command = ' '.join(sys.argv)
        rm_arguments = ''
        remove_path_list = []

        for arg in sys.argv[1:]:
            arg = self.convert_to_raw_string(arg)

            if re.match(r'^-.+$', arg):
                if rm_arguments:
                    rm_arguments += ' ' + str(arg)
                else:
                    rm_arguments = str(arg)
            elif arg:
                remove_path_list.append(arg)

        return rm_command, rm_arguments, remove_path_list

    def check_protection(self, remove_path, command):
        """
        Check the remove_path is protected or not.
        """
        if os.path.exists(remove_path):
            absolute_remove_path = os.path.abspath(remove_path)

            for protected_path in self.protected_path_list:
                if re.match(r'^' + str(absolute_remove_path) + '(/.*)?$', protected_path):
                    if absolute_remove_path == protected_path:
                        print_warning('*Warning*: Cannot remove protected path "' + str(remove_path) + '", skip!')
                        self.save_log('*Warning*: Cannot remove protected path "' + str(remove_path) + '", skip!', message_level='Warning')
                    else:
                        print_warning('*Warning*: Cannot remove path "' + str(remove_path) + '", there are protected data under it, skip!')
                        self.save_log('*Warning*: Cannot remove path "' + str(remove_path) + '", there are protected data under it, skip!', message_level='Warning')

                    if USER == LOGIN_USER:
                        self.send_alarm(message='Time: ' + str(CURRENT_TIME) + '\n User: ' + str(USER) + '\n Host: ' + str(HOSTNAME) + '\n Cwd: ' + str(CWD) + '\n Command: ' + str(command) + '  (skip)')
                    else:
                        self.send_alarm(message='Time: ' + str(CURRENT_TIME) + '\n User: ' + str(USER) + '(' + str(LOGIN_USER) + ')\n Host: ' + str(HOSTNAME) + '\n Cwd: ' + str(CWD) + '\n Command: ' + str(command) + '  (skip)')

                    return True

        return False

    def check_honeypot(self, remove_path, command):
        """
        Check the remove_path is honeypot or not.
        """
        if os.path.exists(remove_path):
            absolute_remove_path = os.path.abspath(remove_path)

            for honeypot_path in self.honeypot_path_list:
                absolute_remove_path_honeypot_path = str(absolute_remove_path) + '/' + str(honeypot_path)

                if (absolute_remove_path == honeypot_path) or re.match(r'^.*/' + str(honeypot_path) + '$', absolute_remove_path) or os.path.exists(absolute_remove_path_honeypot_path):
                    self.save_log('*Warning*: Honeypot data "' + str(absolute_remove_path) + '" is removed, notice!', message_level='Warning')

                    if USER == LOGIN_USER:
                        self.send_alarm(message='Time: ' + str(CURRENT_TIME) + '\n User: ' + str(USER) + '\n Host: ' + str(HOSTNAME) + '\n Cwd: ' + str(CWD) + '\n Command: ' + str(command) + '  (honeypot)')
                    else:
                        self.send_alarm(message='Time: ' + str(CURRENT_TIME) + '\n User: ' + str(USER) + '(' + str(LOGIN_USER) + ')\n Host: ' + str(HOSTNAME) + '\n Cwd: ' + str(CWD) + '\n Command: ' + str(command) + '  (honeypot)')

    def send_alarm(self, title='', message='', user=''):
        """
        Send alarm message with self.alarm_command.
        """
        if self.alarm_command and (USER not in self.alarm_ignore_user_list):
            if title:
                alarm_command = re.sub(r'<TITLE>', title, self.alarm_command)

            if message:
                alarm_command = re.sub(r'<MESSAGE>', message, self.alarm_command)

            if user:
                alarm_command = re.sub(r'<USER>', user, self.alarm_command)

            debug_print('[COMMAND] ' + str(alarm_command), debug_level_list=[2, 3])
            self.save_log('[alarm_command=' + str(alarm_command) + ']', message_level='Info')
            subprocess.Popen(alarm_command, shell=True)

    def move_to_recycle(self, remove_path):
        """
        Move remove_path into user_recycle_bin_dir.
        """
        # Check if remove_path is already defined in self.recycle_path_list.
        absolute_remove_path = os.path.abspath(remove_path)
        recycle_match = False

        for recycle_path in self.recycle_path_list:
            if re.match(r'^' + str(absolute_remove_path) + '(/.*)?$', recycle_path):
                recycle_match = True
                break

        if not recycle_match:
            return

        # Generate user_recycle_bin_dir.
        user_recycle_bin_dir = str(self.recycle_bin_dir) + '/' + str(USER)
        self.create_dir(user_recycle_bin_dir, permission=0o700)

        # Move remove_path into user_recycle_bin_dir.
        remove_path_name = os.path.basename(remove_path)
        target_path = str(user_recycle_bin_dir) + '/' + str(remove_path_name)

        if os.path.exists(target_path):
            current_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
            recycle_command = '/bin/cp -f ' + str(remove_path) + ' ' + str(target_path) + '.' + str(current_time)
        else:
            recycle_command = '/bin/cp -f ' + str(remove_path) + ' ' + str(target_path)

        debug_print('[COMMAND] ' + str(recycle_command), debug_level_list=[2, 3])
        self.save_log('[recycle_command=' + str(recycle_command) + ']', message_level='Info')

        if ('SAFE_RM_DEBUG' not in os.environ) or (os.environ['SAFE_RM_DEBUG'] != '4'):
            self.run_command(recycle_command)

    def rm(self, remove_path):
        """
        Remove remove_path with python function os.remove() and shutil.retree().
        """
        if os.path.exists(remove_path):
            if os.path.isfile(remove_path) or os.path.islink(remove_path):
                try:
                    os.remove(remove_path)
                except PermissionError:
                    print(f'rm: cannot remove "{remove_path}": Permission denied.')
                except Exception as err:
                    print(f'rm: failed on removing "{remove_path}": {err}')
            elif os.path.isdir(remove_path):
                try:
                    shutil.rmtree(remove_path)
                except PermissionError:
                    print(f'rm: cannot remove "{remove_path}": Permission denied.')
                except Exception as err:
                    print(f'rm: failed on removing "{remove_path}": {err}')

    def run(self):
        """
        Main function.
        """
        (rm_command, rm_arguments, remove_path_list) = self.parse_rm_command()
        self.save_log('[command=' + str(rm_command) + ']', message_level='Info')

        if (not rm_arguments) and (not remove_path_list):
            print("rm: missing operand\nTry 'rm --help' for more information.")
        elif rm_arguments and (not remove_path_list):
            if self.system_rm and os.path.exists(self.system_rm):
                command = str(self.system_rm) + ' ' + str(rm_arguments)
                self.run_command(command, show=True)
        else:
            for remove_path in remove_path_list:
                if not self.check_protection(remove_path, rm_command):
                    # Check honeypot.
                    self.check_honeypot(remove_path, rm_command)

                    # Execute recycle mode.
                    if self.recycle_mode and self.recycle_bin_dir and os.path.exists(self.recycle_bin_dir) and self.recycle_path_list:
                        self.move_to_recycle(remove_path)

                    # Execute remove.
                    if self.system_rm and os.path.exists(self.system_rm):
                        if rm_arguments:
                            command = str(self.system_rm) + ' ' + str(rm_arguments) + ' ' + str(remove_path)
                        else:
                            command = str(self.system_rm) + ' ' + str(remove_path)

                        debug_print('[COMMAND] ' + str(command), debug_level_list=[2, 3])

                        if ('SAFE_RM_DEBUG' not in os.environ) or (os.environ['SAFE_RM_DEBUG'] != '4'):
                            self.run_command(command, show=True)
                    else:
                        debug_print('[COMMAND] rm -rf ' + str(remove_path), debug_level_list=[2, 3])

                        if ('SAFE_RM_DEBUG' not in os.environ) or (os.environ['SAFE_RM_DEBUG'] != '4'):
                            self.rm(remove_path)


################
# Main Process #
################
def main():
    my_safe_rm = SafeRm()
    my_safe_rm.run()


if __name__ == '__main__':
    main()
